<!DOCTYPE html>
<html lang="en-US">
  <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width,initial-scale=1">
    <title>5. Kafka API（Java中的Kafka使用） | JavaKeeper</title>
    <meta name="generator" content="VuePress 1.5.4">
    <link rel="icon" href="/icon.svg">
    <script>
        var _hmt = _hmt || [];
        (function() {
            var hm = document.createElement("script");
            hm.src = "https://hm.baidu.com/hm.js?a949a9b30eb86ac0159e735ff8670c03";
            var s = document.getElementsByTagName("script")[0];
            s.parentNode.insertBefore(hm, s);
            // 引入谷歌,不需要可删除这段
            var hm1 = document.createElement("script");
            hm1.src = "https://www.googletagmanager.com/gtag/js?id=UA-169923503-1";
            var s1 = document.getElementsByTagName("script")[0]; 
            s1.parentNode.insertBefore(hm1, s1);
        })();
        // 谷歌加载,不需要可删除
        window.dataLayer = window.dataLayer || [];
        function gtag(){dataLayer.push(arguments);}
        gtag('js', new Date());
        gtag('config', 'UA-169923503-1');
    </script>
    <meta name="description" content="">
    <meta name="keywords" content="JavaKeeper,Java,Java开发,算法,blog">
    <link rel="preload" href="/assets/css/0.styles.91f57736.css" as="style"><link rel="preload" href="/assets/js/app.447d4224.js" as="script"><link rel="preload" href="/assets/js/3.9d76740c.js" as="script"><link rel="preload" href="/assets/js/1.c4fd7d2e.js" as="script"><link rel="preload" href="/assets/js/151.971fdf4b.js" as="script"><link rel="prefetch" href="/assets/js/10.8cf3be2c.js"><link rel="prefetch" href="/assets/js/100.74f35ab8.js"><link rel="prefetch" href="/assets/js/101.7a062346.js"><link rel="prefetch" href="/assets/js/102.c9485235.js"><link rel="prefetch" href="/assets/js/103.d88a3805.js"><link rel="prefetch" href="/assets/js/104.6e034144.js"><link rel="prefetch" href="/assets/js/105.d22f7450.js"><link rel="prefetch" href="/assets/js/106.a6cb54b0.js"><link rel="prefetch" href="/assets/js/107.7b65e72b.js"><link rel="prefetch" href="/assets/js/108.eb5804bb.js"><link rel="prefetch" href="/assets/js/109.05f775e5.js"><link rel="prefetch" href="/assets/js/11.c54ae13c.js"><link rel="prefetch" href="/assets/js/110.51d3d641.js"><link rel="prefetch" href="/assets/js/111.022b64a7.js"><link rel="prefetch" href="/assets/js/112.da8afd52.js"><link rel="prefetch" href="/assets/js/113.05a17b18.js"><link rel="prefetch" href="/assets/js/114.8960d913.js"><link rel="prefetch" href="/assets/js/115.67919f68.js"><link rel="prefetch" href="/assets/js/116.62b0cd71.js"><link rel="prefetch" href="/assets/js/117.ebac3eff.js"><link rel="prefetch" href="/assets/js/118.ecd629bd.js"><link rel="prefetch" href="/assets/js/119.a09a0897.js"><link rel="prefetch" href="/assets/js/12.60aa3b24.js"><link rel="prefetch" href="/assets/js/120.bf639d3d.js"><link rel="prefetch" href="/assets/js/121.b89d0c8e.js"><link rel="prefetch" href="/assets/js/122.1a75ff83.js"><link rel="prefetch" href="/assets/js/123.d2127132.js"><link rel="prefetch" href="/assets/js/124.2caff9e0.js"><link rel="prefetch" href="/assets/js/125.9b9f966a.js"><link rel="prefetch" href="/assets/js/126.58cdfb3d.js"><link rel="prefetch" href="/assets/js/127.8ef09c53.js"><link rel="prefetch" href="/assets/js/128.efdc2ae4.js"><link rel="prefetch" href="/assets/js/129.e35cbc57.js"><link rel="prefetch" href="/assets/js/13.125c13a0.js"><link rel="prefetch" href="/assets/js/130.f01a55e3.js"><link rel="prefetch" href="/assets/js/131.65205f4a.js"><link rel="prefetch" href="/assets/js/132.f42c5a0a.js"><link rel="prefetch" href="/assets/js/133.9ba468b3.js"><link rel="prefetch" href="/assets/js/134.7b597ba9.js"><link rel="prefetch" href="/assets/js/135.fb828b9a.js"><link rel="prefetch" href="/assets/js/136.3887532f.js"><link rel="prefetch" href="/assets/js/137.549bae01.js"><link rel="prefetch" href="/assets/js/138.db8d423d.js"><link rel="prefetch" href="/assets/js/139.dbaf2267.js"><link rel="prefetch" href="/assets/js/14.bd1d0b0d.js"><link rel="prefetch" href="/assets/js/140.6cb65fdc.js"><link rel="prefetch" href="/assets/js/141.9bd6cc4b.js"><link rel="prefetch" href="/assets/js/142.552db5ed.js"><link rel="prefetch" href="/assets/js/143.2c9f2bf4.js"><link rel="prefetch" href="/assets/js/144.fba98a15.js"><link rel="prefetch" href="/assets/js/145.c42f3a21.js"><link rel="prefetch" href="/assets/js/146.596d4d33.js"><link rel="prefetch" href="/assets/js/147.c48ae5c1.js"><link rel="prefetch" href="/assets/js/148.71064871.js"><link rel="prefetch" href="/assets/js/149.16582d21.js"><link rel="prefetch" href="/assets/js/15.f247873b.js"><link rel="prefetch" href="/assets/js/150.ead09aca.js"><link rel="prefetch" href="/assets/js/152.369c9362.js"><link rel="prefetch" href="/assets/js/153.371edd15.js"><link rel="prefetch" href="/assets/js/154.e090b491.js"><link rel="prefetch" href="/assets/js/155.c68bf602.js"><link rel="prefetch" href="/assets/js/156.304aea8d.js"><link rel="prefetch" href="/assets/js/157.83beef7f.js"><link rel="prefetch" href="/assets/js/158.bb1794b0.js"><link rel="prefetch" href="/assets/js/159.2d54e792.js"><link rel="prefetch" href="/assets/js/16.04336c71.js"><link rel="prefetch" href="/assets/js/160.99d56586.js"><link rel="prefetch" href="/assets/js/161.edf660aa.js"><link rel="prefetch" href="/assets/js/162.0b84606e.js"><link rel="prefetch" href="/assets/js/163.b59e0d60.js"><link rel="prefetch" href="/assets/js/164.d9eb8228.js"><link rel="prefetch" href="/assets/js/165.ca624c79.js"><link rel="prefetch" href="/assets/js/166.025b2ba1.js"><link rel="prefetch" href="/assets/js/167.abc982cc.js"><link rel="prefetch" href="/assets/js/168.27ca13dc.js"><link rel="prefetch" href="/assets/js/169.41e753a2.js"><link rel="prefetch" href="/assets/js/17.43b3c1c8.js"><link rel="prefetch" href="/assets/js/170.626319e1.js"><link rel="prefetch" href="/assets/js/171.a221dddf.js"><link rel="prefetch" href="/assets/js/172.464b2361.js"><link rel="prefetch" href="/assets/js/173.96a3afee.js"><link rel="prefetch" href="/assets/js/174.116607c2.js"><link rel="prefetch" href="/assets/js/175.ea3e8659.js"><link rel="prefetch" href="/assets/js/176.7d7b8afc.js"><link rel="prefetch" href="/assets/js/177.a6e00aa0.js"><link rel="prefetch" href="/assets/js/178.1f93afaf.js"><link rel="prefetch" href="/assets/js/179.3aa00dcd.js"><link rel="prefetch" href="/assets/js/18.d81b44d5.js"><link rel="prefetch" href="/assets/js/180.f8b2b75a.js"><link rel="prefetch" href="/assets/js/181.8e11258a.js"><link rel="prefetch" href="/assets/js/182.22243941.js"><link rel="prefetch" href="/assets/js/183.d051fdf6.js"><link rel="prefetch" href="/assets/js/184.a994075e.js"><link rel="prefetch" href="/assets/js/185.776c7e16.js"><link rel="prefetch" href="/assets/js/186.f1887955.js"><link rel="prefetch" href="/assets/js/187.da0d3626.js"><link rel="prefetch" href="/assets/js/188.8dfc358f.js"><link rel="prefetch" href="/assets/js/189.dcac5a59.js"><link rel="prefetch" href="/assets/js/19.1b3d66e1.js"><link rel="prefetch" href="/assets/js/190.c7e413d0.js"><link rel="prefetch" href="/assets/js/191.d9806121.js"><link rel="prefetch" href="/assets/js/192.869791da.js"><link rel="prefetch" href="/assets/js/193.2d74c4c8.js"><link rel="prefetch" href="/assets/js/194.c73a1909.js"><link rel="prefetch" href="/assets/js/195.e8c74834.js"><link rel="prefetch" href="/assets/js/20.bd5949ec.js"><link rel="prefetch" href="/assets/js/21.3fcf98cf.js"><link rel="prefetch" href="/assets/js/22.2fa1e2e8.js"><link rel="prefetch" href="/assets/js/23.1ae64bb4.js"><link rel="prefetch" href="/assets/js/24.7bdf7387.js"><link rel="prefetch" href="/assets/js/25.392c436e.js"><link rel="prefetch" href="/assets/js/26.58acbd4b.js"><link rel="prefetch" href="/assets/js/27.c725bdd5.js"><link rel="prefetch" href="/assets/js/28.6c9bda1e.js"><link rel="prefetch" href="/assets/js/29.e656b537.js"><link rel="prefetch" href="/assets/js/30.2c326fc7.js"><link rel="prefetch" href="/assets/js/31.e6c9fa30.js"><link rel="prefetch" href="/assets/js/32.c9c88437.js"><link rel="prefetch" href="/assets/js/33.0c53373c.js"><link rel="prefetch" href="/assets/js/34.9821e543.js"><link rel="prefetch" href="/assets/js/35.de8253eb.js"><link rel="prefetch" href="/assets/js/36.d182f929.js"><link rel="prefetch" href="/assets/js/37.9fa79014.js"><link rel="prefetch" href="/assets/js/38.9bebff76.js"><link rel="prefetch" href="/assets/js/39.19a3a2d4.js"><link rel="prefetch" href="/assets/js/4.564edb9d.js"><link rel="prefetch" href="/assets/js/40.cca6955f.js"><link rel="prefetch" href="/assets/js/41.854cd09a.js"><link rel="prefetch" href="/assets/js/42.ca7b612f.js"><link rel="prefetch" href="/assets/js/43.87027d58.js"><link rel="prefetch" href="/assets/js/44.8c2b4f4b.js"><link rel="prefetch" href="/assets/js/45.dffb4e08.js"><link rel="prefetch" href="/assets/js/46.f58049a5.js"><link rel="prefetch" href="/assets/js/47.6854070c.js"><link rel="prefetch" href="/assets/js/48.6cd9fa3d.js"><link rel="prefetch" href="/assets/js/49.e8861afa.js"><link rel="prefetch" href="/assets/js/5.5c31d62f.js"><link rel="prefetch" href="/assets/js/50.703bffab.js"><link rel="prefetch" href="/assets/js/51.6655c373.js"><link rel="prefetch" href="/assets/js/52.deb2eb09.js"><link rel="prefetch" href="/assets/js/53.6e0ed77d.js"><link rel="prefetch" href="/assets/js/54.b05c58ad.js"><link rel="prefetch" href="/assets/js/55.49c8164e.js"><link rel="prefetch" href="/assets/js/56.a5574e6b.js"><link rel="prefetch" href="/assets/js/57.58cb0de4.js"><link rel="prefetch" href="/assets/js/58.52345112.js"><link rel="prefetch" href="/assets/js/59.663ce78d.js"><link rel="prefetch" href="/assets/js/6.a9df34ee.js"><link rel="prefetch" href="/assets/js/60.f06adde2.js"><link rel="prefetch" href="/assets/js/61.170255a1.js"><link rel="prefetch" href="/assets/js/62.9d120050.js"><link rel="prefetch" href="/assets/js/63.70cced6b.js"><link rel="prefetch" href="/assets/js/64.577f3548.js"><link rel="prefetch" href="/assets/js/65.c037b29d.js"><link rel="prefetch" href="/assets/js/66.7dd1045f.js"><link rel="prefetch" href="/assets/js/67.d3aa4d6c.js"><link rel="prefetch" href="/assets/js/68.526dbb61.js"><link rel="prefetch" href="/assets/js/69.58269266.js"><link rel="prefetch" href="/assets/js/7.6609d4d6.js"><link rel="prefetch" href="/assets/js/70.64108f1b.js"><link rel="prefetch" href="/assets/js/71.1e95e0a6.js"><link rel="prefetch" href="/assets/js/72.42e7ec94.js"><link rel="prefetch" href="/assets/js/73.dad4e1c5.js"><link rel="prefetch" href="/assets/js/74.28ea286a.js"><link rel="prefetch" href="/assets/js/75.dd6d4c6f.js"><link rel="prefetch" href="/assets/js/76.ca6539df.js"><link rel="prefetch" href="/assets/js/77.feb13b0e.js"><link rel="prefetch" href="/assets/js/78.321e90e6.js"><link rel="prefetch" href="/assets/js/79.68eb8fcf.js"><link rel="prefetch" href="/assets/js/8.396d51fd.js"><link rel="prefetch" href="/assets/js/80.4edb5321.js"><link rel="prefetch" href="/assets/js/81.735d7e57.js"><link rel="prefetch" href="/assets/js/82.fa120bdf.js"><link rel="prefetch" href="/assets/js/83.bf755f94.js"><link rel="prefetch" href="/assets/js/84.9b32070c.js"><link rel="prefetch" href="/assets/js/85.592aca7c.js"><link rel="prefetch" href="/assets/js/86.4dcd9e73.js"><link rel="prefetch" href="/assets/js/87.a9e546aa.js"><link rel="prefetch" href="/assets/js/88.2a423212.js"><link rel="prefetch" href="/assets/js/89.5f455115.js"><link rel="prefetch" href="/assets/js/9.adb074c6.js"><link rel="prefetch" href="/assets/js/90.5202da0a.js"><link rel="prefetch" href="/assets/js/91.02cee99d.js"><link rel="prefetch" href="/assets/js/92.f16bad0b.js"><link rel="prefetch" href="/assets/js/93.f933634f.js"><link rel="prefetch" href="/assets/js/94.8e7b1d65.js"><link rel="prefetch" href="/assets/js/95.ee0e4a0a.js"><link rel="prefetch" href="/assets/js/96.e21d78c2.js"><link rel="prefetch" href="/assets/js/97.c87e514e.js"><link rel="prefetch" href="/assets/js/98.d123ac92.js"><link rel="prefetch" href="/assets/js/99.92d1b416.js">
    <link rel="stylesheet" href="/assets/css/0.styles.91f57736.css">
  </head>
  <body>
    <div id="app" data-server-rendered="true"><div class="theme-container no-sidebar" data-v-3ba18f14><div data-v-3ba18f14><div id="loader-wrapper" class="loading-wrapper" data-v-041fef5b data-v-3ba18f14 data-v-3ba18f14><div class="loader-main" data-v-041fef5b><div data-v-041fef5b></div><div data-v-041fef5b></div><div data-v-041fef5b></div><div data-v-041fef5b></div></div> <!----> <!----></div> <div class="password-shadow password-wrapper-out" style="display:none;" data-v-68139a52 data-v-3ba18f14 data-v-3ba18f14><h3 class="title" style="display:none;" data-v-68139a52 data-v-68139a52>JavaKeeper</h3> <!----> <label id="box" class="inputBox" style="display:none;" data-v-68139a52 data-v-68139a52><input type="password" value="" data-v-68139a52> <span data-v-68139a52>Konck! Knock!</span> <button data-v-68139a52>OK</button></label> <div class="footer" style="display:none;" data-v-68139a52 data-v-68139a52><span data-v-68139a52><i class="iconfont reco-theme" data-v-68139a52></i> <a target="blank" href="https://vuepress-theme-reco.recoluan.com" data-v-68139a52>vuePress-theme-reco</a></span> <span data-v-68139a52><i class="iconfont reco-copyright" data-v-68139a52></i> <a data-v-68139a52><span data-v-68139a52>海星</span>
            
          <!---->
          2020
        </a></span></div></div> <div class="hide" data-v-3ba18f14><header class="navbar" data-v-3ba18f14><div class="sidebar-button"><svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" role="img" viewBox="0 0 448 512" class="icon"><path fill="currentColor" d="M436 124H12c-6.627 0-12-5.373-12-12V80c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12zm0 160H12c-6.627 0-12-5.373-12-12v-32c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12zm0 160H12c-6.627 0-12-5.373-12-12v-32c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12z"></path></svg></div> <a href="/" class="home-link router-link-active"><!----> <span class="site-name">JavaKeeper</span></a> <div class="links"><div class="color-picker"><a class="color-button"><i class="iconfont reco-color"></i></a> <div class="color-picker-menu" style="display:none;"><div class="mode-options"><h4 class="title">Choose mode</h4> <ul class="color-mode-options"><li class="dark">dark</li><li class="auto active">auto</li><li class="light">light</li></ul></div></div></div> <div class="search-box"><i class="iconfont reco-search"></i> <input aria-label="Search" autocomplete="off" spellcheck="false" value=""> <!----></div> <nav class="nav-links can-hide"><div class="nav-item"><a href="/java/" class="nav-link"><i class="iconfont undefined"></i>
  Java
</a></div><div class="nav-item"><a href="/data-structure-algorithms/" class="nav-link"><i class="iconfont undefined"></i>
  数据结构与算法
</a></div><div class="nav-item"><a href="/data-store/" class="nav-link"><i class="iconfont undefined"></i>
  数据存储与缓存
</a></div><div class="nav-item"><a href="/interview/" class="nav-link"><i class="iconfont undefined"></i>
  直击面试
</a></div> <a href="https://github.com/Jstarfish/JavaKeeper" target="_blank" rel="noopener noreferrer" class="repo-link"><i class="iconfont reco-github"></i>
    GitHub
    <svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a></nav></div></header> <div class="sidebar-mask" data-v-3ba18f14></div> <aside class="sidebar" data-v-3ba18f14><div class="personal-info-wrapper" data-v-5f6acefd data-v-3ba18f14><!----> <h3 class="name" data-v-5f6acefd>
    海星
  </h3> <div class="num" data-v-5f6acefd><div data-v-5f6acefd><h3 data-v-5f6acefd>0</h3> <h6 data-v-5f6acefd>Article</h6></div> <div data-v-5f6acefd><h3 data-v-5f6acefd>0</h3> <h6 data-v-5f6acefd>Tag</h6></div></div> <hr data-v-5f6acefd></div> <nav class="nav-links"><div class="nav-item"><a href="/java/" class="nav-link"><i class="iconfont undefined"></i>
  Java
</a></div><div class="nav-item"><a href="/data-structure-algorithms/" class="nav-link"><i class="iconfont undefined"></i>
  数据结构与算法
</a></div><div class="nav-item"><a href="/data-store/" class="nav-link"><i class="iconfont undefined"></i>
  数据存储与缓存
</a></div><div class="nav-item"><a href="/interview/" class="nav-link"><i class="iconfont undefined"></i>
  直击面试
</a></div> <a href="https://github.com/Jstarfish/JavaKeeper" target="_blank" rel="noopener noreferrer" class="repo-link"><i class="iconfont reco-github"></i>
    GitHub
    <svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a></nav> <!----> </aside> <div class="password-shadow password-wrapper-in" style="display:none;" data-v-68139a52 data-v-3ba18f14><h3 class="title" style="display:none;" data-v-68139a52 data-v-68139a52></h3> <!----> <label id="box" class="inputBox" style="display:none;" data-v-68139a52 data-v-68139a52><input type="password" value="" data-v-68139a52> <span data-v-68139a52>Konck! Knock!</span> <button data-v-68139a52>OK</button></label> <div class="footer" style="display:none;" data-v-68139a52 data-v-68139a52><span data-v-68139a52><i class="iconfont reco-theme" data-v-68139a52></i> <a target="blank" href="https://vuepress-theme-reco.recoluan.com" data-v-68139a52>vuePress-theme-reco</a></span> <span data-v-68139a52><i class="iconfont reco-copyright" data-v-68139a52></i> <a data-v-68139a52><span data-v-68139a52>海星</span>
            
          <!---->
          2020
        </a></span></div></div> <div data-v-3ba18f14><main class="page"><div class="page-title" style="display:none;"><h1 class="title">5. Kafka API（Java中的Kafka使用）</h1> <div data-v-5d8dbdb4><i class="iconfont reco-account" data-v-5d8dbdb4><span data-v-5d8dbdb4>海星</span></i> <!----> <!----> <!----></div></div> <div class="theme-reco-content content__default" style="display:none;"><h2 id="_5-kafka-api-java中的kafka使用"><a href="#_5-kafka-api-java中的kafka使用" class="header-anchor">#</a> 5. Kafka API（Java中的Kafka使用）</h2> <h3 id="_5-1-启动zk和kafka集群"><a href="#_5-1-启动zk和kafka集群" class="header-anchor">#</a> 5.1 启动zk和kafka集群</h3> <p><img src="H:/Technical-Learning/docs/_images/message-queue/Kafka/kafka-start.png" alt="img"></p> <h3 id="_5-2-导入-pom-依赖"><a href="#_5-2-导入-pom-依赖" class="header-anchor">#</a> 5.2 导入 pom 依赖</h3> <div class="language- extra-class"><pre class="language-text"><code>&lt;dependency&gt;
    &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
    &lt;artifactId&gt;kafka_2.12&lt;/artifactId&gt;
    &lt;version&gt;2.1.1&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
    &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
    &lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
    &lt;version&gt;2.1.1&lt;/version&gt;
&lt;/dependency&gt;
</code></pre></div><h3 id="_5-3-创建生产者"><a href="#_5-3-创建生产者" class="header-anchor">#</a> 5.3 创建生产者</h3> <p>Kafka的Producer发送消息采用的是<strong>异步发送</strong>的方式。在消息发送过程中，涉及到了两个线程：<strong>main线程和Sender线程</strong>，以及一个线程共享变量：<strong>RecordAccumulator</strong>。main线程将消息发送给RecordAccumulator，Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。</p> <p><img src="H:/Technical-Learning/docs/_images/message-queue/Kafka/kafka-producer-thread.png" alt="img"></p> <p>相关参数：</p> <ul><li><strong>batch.size</strong>：只有数据积累到batch.size之后，sender才会发送数据。</li> <li><strong>linger.ms</strong>：如果数据迟迟未达到batch.size，sender等待linger.time之后就会发送数据。</li></ul> <h4 id="_5-3-1-异步发送api"><a href="#_5-3-1-异步发送api" class="header-anchor">#</a> 5.3.1 异步发送API</h4> <div class="language-java extra-class"><pre class="language-java"><code><span class="token keyword">package</span> <span class="token namespace">priv<span class="token punctuation">.</span>learn<span class="token punctuation">.</span>producer</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Properties</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">KafkaProducer</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">Producer</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">ProducerRecord</span><span class="token punctuation">;</span>

<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">CustomerProducer</span> <span class="token punctuation">{</span>
    <span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token class-name">Properties</span> properties <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">Properties</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// Kafka 服务端的主机名和端口号</span>
        properties<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;bootstrap.servers&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;39.121.214.96:9092&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 等待所有副本节点的应答</span>
        properties<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;acks&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;all&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 消息发送最大尝试次数</span>
        properties<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;retries&quot;</span><span class="token punctuation">,</span> <span class="token number">0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 一批消息处理大小</span>
        properties<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;batch.size&quot;</span><span class="token punctuation">,</span> <span class="token number">16384</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 请求延时</span>
        properties<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;linger.ms&quot;</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 发送缓存区内存大小</span>
        properties<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;buffer.memory&quot;</span><span class="token punctuation">,</span> <span class="token number">33554432</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// key 序列化</span>
  properties<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;key.serializer&quot;</span><span class="token punctuation">,</span><span class="token string">&quot;org.apache.kafka.common.serialization.StringSerializer&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// value 序列化     properties.put(&quot;value.serializer&quot;,&quot;org.apache.kafka.common.serialization.StringSerializer&quot;);</span>
        <span class="token class-name">Producer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> producer <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">KafkaProducer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span>properties<span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">int</span> i <span class="token operator">=</span> <span class="token number">0</span><span class="token punctuation">;</span> i <span class="token operator">&lt;</span> <span class="token number">50</span><span class="token punctuation">;</span> i<span class="token operator">++</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
            producer<span class="token punctuation">.</span><span class="token function">send</span><span class="token punctuation">(</span><span class="token keyword">new</span> <span class="token class-name">ProducerRecord</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span><span class="token string">&quot;learn-java-kafka&quot;</span><span class="token punctuation">,</span>
                    <span class="token class-name">Integer</span><span class="token punctuation">.</span><span class="token function">toString</span><span class="token punctuation">(</span>i<span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token string">&quot;hello world-&quot;</span> <span class="token operator">+</span> i<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token punctuation">}</span>
        producer<span class="token punctuation">.</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre></div><h4 id="_5-3-2-异步发送-带回调函数"><a href="#_5-3-2-异步发送-带回调函数" class="header-anchor">#</a> 5.3.2  异步发送，带回调函数</h4> <p>回调函数会在 producer 收到 ack 时调用，为异步调用，该方法有两个参数，分别是 RecordMetadata 和 Exception，如果 Exception 为 null，说明消息发送成功，如果 Exception 不为 null，说明消息发送失败。 注意：消息发送失败会自动重试，不需要我们在回调函数中手动重试。</p> <div class="language-java extra-class"><pre class="language-java"><code><span class="token keyword">package</span> <span class="token namespace">priv<span class="token punctuation">.</span>learn<span class="token punctuation">.</span>producer</span><span class="token punctuation">;</span>

<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">Callback</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">KafkaProducer</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">ProducerRecord</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">RecordMetadata</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Properties</span><span class="token punctuation">;</span>

<span class="token comment">/**
- @date: 2019/9/9 16:11
- @description: 创建生产者带回调函数（新 API）
  */</span>
  <span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">CallBackProducer</span> <span class="token punctuation">{</span>

  <span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
      <span class="token class-name">Properties</span> props <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">Properties</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;bootstrap.servers&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;10.121.214.96:9092&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      <span class="token comment">// 等待所有副本节点的应答</span>
      props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;acks&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;all&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      <span class="token comment">// 消息发送最大尝试次数</span>
      props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;retries&quot;</span><span class="token punctuation">,</span> <span class="token number">0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      <span class="token comment">// 一批消息处理大小</span>
      props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;batch.size&quot;</span><span class="token punctuation">,</span> <span class="token number">16384</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      <span class="token comment">// 增加服务端请求延时</span>
      props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;linger.ms&quot;</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      <span class="token comment">// 发送缓存区内存大小</span>
      props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;buffer.memory&quot;</span><span class="token punctuation">,</span> <span class="token number">33554432</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      <span class="token comment">// key 序列化</span>
      props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;key.serializer&quot;</span><span class="token punctuation">,</span>
              <span class="token string">&quot;org.apache.kafka.common.serialization.StringSerializer&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      <span class="token comment">// value 序列化</span>
      props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;value.serializer&quot;</span><span class="token punctuation">,</span>
              <span class="token string">&quot;org.apache.kafka.common.serialization.StringSerializer&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      <span class="token class-name">KafkaProducer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> kafkaProducer <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">KafkaProducer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span>props<span class="token punctuation">)</span><span class="token punctuation">;</span>
      <span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">int</span> i <span class="token operator">=</span> <span class="token number">0</span><span class="token punctuation">;</span> i <span class="token operator">&lt;</span> <span class="token number">50</span><span class="token punctuation">;</span> i<span class="token operator">++</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
          kafkaProducer<span class="token punctuation">.</span><span class="token function">send</span><span class="token punctuation">(</span><span class="token keyword">new</span> <span class="token class-name">ProducerRecord</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span><span class="token string">&quot;learn-java-kafka&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;hello&quot;</span>
                  <span class="token operator">+</span> i<span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token keyword">new</span> <span class="token class-name">Callback</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
              <span class="token annotation punctuation">@Override</span>
              <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">onCompletion</span><span class="token punctuation">(</span><span class="token class-name">RecordMetadata</span> metadata<span class="token punctuation">,</span> <span class="token class-name">Exception</span>
                      exception<span class="token punctuation">)</span> <span class="token punctuation">{</span>
                  <span class="token keyword">if</span> <span class="token punctuation">(</span>metadata <span class="token operator">!=</span> <span class="token keyword">null</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
                      <span class="token class-name">System</span><span class="token punctuation">.</span>err<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span>metadata<span class="token punctuation">.</span><span class="token function">partition</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">+</span> <span class="token string">&quot;---&quot;</span> <span class="token operator">+</span>
                              metadata<span class="token punctuation">.</span><span class="token function">offset</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
                  <span class="token punctuation">}</span>
              <span class="token punctuation">}</span>
          <span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
      <span class="token punctuation">}</span>
      kafkaProducer<span class="token punctuation">.</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  <span class="token punctuation">}</span>
  <span class="token punctuation">}</span>
</code></pre></div><h4 id="_5-3-2-同步发送"><a href="#_5-3-2-同步发送" class="header-anchor">#</a> 5.3.2  同步发送</h4> <p>同步发送的意思就是，一条消息发送之后，会阻塞当前线程，直至返回 ack。 由于 send 方法返回的是一个 Future 对象，根据 Futrue 对象的特点，我们也可以实现同 步发送的效果，只需在调用 Future 对象的 get 方发即可。</p> <div class="language-java extra-class"><pre class="language-java"><code>producer<span class="token punctuation">.</span>send（<span class="token keyword">record</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">get</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
</code></pre></div><h3 id="_5-4-创建消费者"><a href="#_5-4-创建消费者" class="header-anchor">#</a> 5.4 创建消费者</h3> <p>Consumer 消费数据时的可靠性是很容易保证的，因为数据在 Kafka 中是持久化的，故 不用担心数据丢失问题。 由于 consumer 在消费过程中可能会出现断电宕机等故障，consumer 恢复后，需要从故 障前的位置的继续消费，所以 consumer 需要实时记录自己消费到了哪个 offset，以便故障恢 复后继续消费。 所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。</p> <p>kafka 提供了两套 consumer API： 高级 Consumer API 和低级 Consumer API。</p> <h4 id="_4-3-1-高级-api-自动提交offset"><a href="#_4-3-1-高级-api-自动提交offset" class="header-anchor">#</a> 4.3.1 高级 API（自动提交offset）</h4> <ul><li><p>高级 API 优点</p> <ul><li>高级 API 写起来简单</li> <li>不需要自行去管理 offset，系统通过 zookeeper 自行管理。</li> <li>不需要管理分区，副本等情况， 系统自动管理。 消费者断线会自动根据上一次记录在 zookeeper 中的 offset 去接着获取数据（默认设置 1 分钟更新一下 zookeeper 中存的 offset） 可以使用 group 来区分对同一个 topic 的不同程序访问分离开来（不同的 group 记录不同的 offset，这样不同程序读取同一个 topic 才不会因为 offset 互相影响）</li></ul></li> <li><p>高级 API 缺点</p> <ul><li>不能自行控制 offset（对于某些特殊需求来说） 不能细化控制如分区、副本、 zk 等</li></ul> <div class="language-java extra-class"><pre class="language-java"><code><span class="token keyword">package</span> <span class="token namespace">priv<span class="token punctuation">.</span>learn<span class="token punctuation">.</span>consume</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>consumer<span class="token punctuation">.</span></span><span class="token class-name">ConsumerRecord</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>consumer<span class="token punctuation">.</span></span><span class="token class-name">ConsumerRecords</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>consumer<span class="token punctuation">.</span></span><span class="token class-name">KafkaConsumer</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Arrays</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Properties</span><span class="token punctuation">;</span>

<span class="token comment">/**
 * @date: 2019/9/9 16:58
 * @description: 高级 API:官方提供案例（自动维护消费情况）（新 API）
 */</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">CustomNewConsumer</span> <span class="token punctuation">{</span>

    <span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token class-name">Properties</span> props <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">Properties</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 定义 kakfa 服务的地址，不需要将所有 broker 指定上</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;bootstrap.servers&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;10.121.214.96:9092&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 制定 consumer group</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;group.id&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;test&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 是否自动确认 offset</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;enable.auto.commit&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;true&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 自动确认 offset 的时间间隔</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;auto.commit.interval.ms&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1000&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// key 的序列化类</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;key.deserializer&quot;</span><span class="token punctuation">,</span>
                <span class="token string">&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// value 的序列化类</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;value.deserializer&quot;</span><span class="token punctuation">,</span>
                <span class="token string">&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 定义 consumer</span>
        <span class="token class-name">KafkaConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> consumer <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">KafkaConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span>props<span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 消费者订阅的 topic, 可同时订阅多个</span>
        consumer<span class="token punctuation">.</span><span class="token function">subscribe</span><span class="token punctuation">(</span><span class="token class-name">Arrays</span><span class="token punctuation">.</span><span class="token function">asList</span><span class="token punctuation">(</span><span class="token string">&quot;first&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;second&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;third&quot;</span><span class="token punctuation">,</span><span class="token string">&quot;learn-java-kafka&quot;</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token keyword">while</span> <span class="token punctuation">(</span><span class="token boolean">true</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
            <span class="token comment">// 读取数据，读取超时时间为 100ms</span>
            <span class="token class-name">ConsumerRecords</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> records <span class="token operator">=</span> consumer<span class="token punctuation">.</span><span class="token function">poll</span><span class="token punctuation">(</span><span class="token number">100</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
            <span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token class-name">ConsumerRecord</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> <span class="token keyword">record</span> <span class="token operator">:</span> records<span class="token punctuation">)</span> <span class="token punctuation">{</span>
                <span class="token class-name">System</span><span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">printf</span><span class="token punctuation">(</span><span class="token string">&quot;offset = %d, key = %s, value = %s%n&quot;</span><span class="token punctuation">,</span>
                        <span class="token keyword">record</span><span class="token punctuation">.</span><span class="token function">offset</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token keyword">record</span><span class="token punctuation">.</span><span class="token function">key</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token keyword">record</span><span class="token punctuation">.</span><span class="token function">value</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
            <span class="token punctuation">}</span>
        <span class="token punctuation">}</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre></div><p><strong>结果：</strong></p> <p><img src="H:/Technical-Learning/docs/_images/message-queue/Kafka/kakfa-java-demo.png" alt="img"></p></li></ul> <h4 id="_4-3-2-低级-api-手动提交offset"><a href="#_4-3-2-低级-api-手动提交offset" class="header-anchor">#</a> 4.3.2 低级 API(手动提交offset)</h4> <p>虽然自动提交 offset 十分简介便利，但由于其是基于时间提交的，开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。</p> <ul><li><p>低级 API 优点</p> <ul><li><strong>能够让开发者自己控制 offset，想从哪里读取就从哪里读取。</strong></li> <li>自行控制连接分区，对分区自定义进行负载均衡</li> <li>对 zookeeper 的依赖性降低（如： offset 不一定非要靠 zk 存储，自行存储 offset 即可， 比如存在文件或者内存中）</li></ul></li> <li><p>低级 API 缺点</p> <ul><li>太过复杂，需要自行控制 offset，连接哪个分区，找到分区 leader 等。</li></ul></li> <li><p>手动提交offset的方法有两种，分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是，都会将本次poll的一批数据最高的偏移量提交；不同点是，commitSync阻塞当前线程，一直到提交成功，并且会自动失败重试（由不可控因素导致，也会出现提交失败）；而commitAsyc则没有失败重试机制，故有可能提交失败。</p> <h5 id="_1-同步提交offset"><a href="#_1-同步提交offset" class="header-anchor">#</a> 1) 同步提交offset</h5> <p>由于同步提交 offset 有失败重试机制，故更加可靠，以下为同步提交 offset 的示例。</p></li></ul> <div class="language-java extra-class"><pre class="language-java"><code><span class="token keyword">package</span> <span class="token namespace">priv<span class="token punctuation">.</span>learn<span class="token punctuation">.</span>consume</span><span class="token punctuation">;</span>

<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>consumer<span class="token punctuation">.</span></span><span class="token class-name">ConsumerConfig</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>consumer<span class="token punctuation">.</span></span><span class="token class-name">ConsumerRecord</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>consumer<span class="token punctuation">.</span></span><span class="token class-name">ConsumerRecords</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>consumer<span class="token punctuation">.</span></span><span class="token class-name">KafkaConsumer</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Arrays</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Properties</span><span class="token punctuation">;</span>
<span class="token comment">/**
 * @date: 2019/10/10 16:33
 * @description: 同步手动提交offset
 */</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">CommitSyncCounsumer</span> <span class="token punctuation">{</span>

    <span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token class-name">Properties</span> props <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">Properties</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;bootstrap.servers&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;192.121.214.51:9092&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token class-name">ConsumerConfig</span><span class="token punctuation">.</span>GROUP_ID_CONFIG<span class="token punctuation">,</span> <span class="token string">&quot;test&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>

        <span class="token comment">//1. 关闭自动提交offset</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;enable.auto.commit&quot;</span><span class="token punctuation">,</span><span class="token string">&quot;false&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;enable.auto.commit&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;true&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;auto.commit.interval.ms&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;1000&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;key.deserializer&quot;</span><span class="token punctuation">,</span>
                <span class="token string">&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;value.deserializer&quot;</span><span class="token punctuation">,</span>
                <span class="token string">&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token class-name">KafkaConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> consumer <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">KafkaConsumer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span>props<span class="token punctuation">)</span><span class="token punctuation">;</span>
        consumer<span class="token punctuation">.</span><span class="token function">subscribe</span><span class="token punctuation">(</span><span class="token class-name">Arrays</span><span class="token punctuation">.</span><span class="token function">asList</span><span class="token punctuation">(</span><span class="token string">&quot;first&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;second&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;third&quot;</span><span class="token punctuation">,</span><span class="token string">&quot;learn-java-kafka&quot;</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token keyword">while</span> <span class="token punctuation">(</span><span class="token boolean">true</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
            <span class="token class-name">ConsumerRecords</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> records <span class="token operator">=</span> consumer<span class="token punctuation">.</span><span class="token function">poll</span><span class="token punctuation">(</span><span class="token number">100</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
            <span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token class-name">ConsumerRecord</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> <span class="token keyword">record</span> <span class="token operator">:</span> records<span class="token punctuation">)</span> <span class="token punctuation">{</span>
                <span class="token class-name">System</span><span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">printf</span><span class="token punctuation">(</span><span class="token string">&quot;offset = %d, key = %s, value = %s%n&quot;</span><span class="token punctuation">,</span>
                        <span class="token keyword">record</span><span class="token punctuation">.</span><span class="token function">offset</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token keyword">record</span><span class="token punctuation">.</span><span class="token function">key</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token keyword">record</span><span class="token punctuation">.</span><span class="token function">value</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
            <span class="token punctuation">}</span>
            <span class="token comment">//2. 需要自己手动提交</span>
            consumer<span class="token punctuation">.</span><span class="token function">commitAsync</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token punctuation">}</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre></div><h5 id="_2-异步提交offset"><a href="#_2-异步提交offset" class="header-anchor">#</a> 2) 异步提交offset</h5> <p>虽然同步提交 offset 更可靠一些，但是由于其会阻塞当前线程，直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下，会选用异步提交 offset 的方式</p> <div class="language-java extra-class"><pre class="language-java"><code><span class="token keyword">while</span> <span class="token punctuation">(</span><span class="token boolean">true</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
    <span class="token class-name">ConsumerRecords</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> records <span class="token operator">=</span> consumer<span class="token punctuation">.</span><span class="token function">poll</span><span class="token punctuation">(</span><span class="token number">100</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token class-name">ConsumerRecord</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> <span class="token keyword">record</span> <span class="token operator">:</span> records<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token class-name">System</span><span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">printf</span><span class="token punctuation">(</span><span class="token string">&quot;offset = %d, key = %s, value = %s%n&quot;</span><span class="token punctuation">,</span> <span class="token keyword">record</span><span class="token punctuation">.</span><span class="token function">offset</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token keyword">record</span><span class="token punctuation">.</span><span class="token function">key</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token keyword">record</span><span class="token punctuation">.</span><span class="token function">value</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>
    consumer<span class="token punctuation">.</span><span class="token function">commitAsync</span><span class="token punctuation">(</span><span class="token keyword">new</span> <span class="token class-name">OffsetCommitCallback</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token annotation punctuation">@Override</span>
        <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">onComplete</span><span class="token punctuation">(</span><span class="token class-name">Map</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">TopicPartition</span><span class="token punctuation">,</span> <span class="token class-name">OffsetAndMetadata</span><span class="token punctuation">&gt;</span></span> map<span class="token punctuation">,</span> <span class="token class-name">Exception</span> e<span class="token punctuation">)</span> <span class="token punctuation">{</span>
            <span class="token keyword">if</span><span class="token punctuation">(</span>e <span class="token operator">!=</span> <span class="token keyword">null</span><span class="token punctuation">)</span><span class="token punctuation">{</span>
                <span class="token class-name">System</span><span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">&quot;commit failed&quot;</span><span class="token operator">+</span>map<span class="token punctuation">)</span><span class="token punctuation">;</span>
            <span class="token punctuation">}</span>
        <span class="token punctuation">}</span>
    <span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
<span class="token punctuation">}</span>
</code></pre></div><p>还可以用官方提供的例子跑一下：<a href="https://github.com/apache/kafka/tree/trunk/examples" target="_blank" rel="noopener noreferrer">https://github.com/apache/kafka/tree/trunk/examples<svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a></p> <h5 id="_3-数据漏消费和重复消费分析"><a href="#_3-数据漏消费和重复消费分析" class="header-anchor">#</a> 3） 数据漏消费和重复消费分析</h5> <p>无论是同步提交还是异步提交 offset，都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费，有可能造成数据的漏消费；而先消费后提交 offset，有可能会造成数据的重复消费。</p> <hr> <h4 id="_4-3-3-自定义存储offset"><a href="#_4-3-3-自定义存储offset" class="header-anchor">#</a> 4.3.3 自定义存储offset</h4> <p>Kafka 0.9 版本之前，offset 存储在 zookeeper，0.9 版本及之后，默认将 offset 存储在 Kafka 的一个内置的 topic 中。除此之外，Kafka 还可以选择自定义存储 offset。</p> <p>offset 的维护是相当繁琐的，因为需要考虑到消费者的 Rebalace。</p> <p>当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发 生变化，就会触发到分区的重新分配，重新分配的过程叫做 Rebalance。</p> <p>消费者发生 Rebalance 之后，每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区，并且定位到每个分区最近提交的 offset 位置继续消费。</p> <p>要实现自定义存储 offset，需要借助 ConsumerRebalanceListener，以下为示例代码，其 中提交和获取 offset 的方法，需要根据所选的 offset 存储系统自行实现。</p> <h2 id="_6-kafka-producer-拦截器-interceptor"><a href="#_6-kafka-producer-拦截器-interceptor" class="header-anchor">#</a> 6. Kafka producer 拦截器(interceptor)</h2> <h3 id="_6-1-拦截器原理"><a href="#_6-1-拦截器原理" class="header-anchor">#</a> 6.1 拦截器原理</h3> <p>Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的，主要用于实现 clients 端的定制化控制逻辑。 	对于 producer 而言， interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会 对消息做一些定制化需求，比如<strong>修改消息</strong>等。同时， producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(interceptor chain)。 Intercetpor 的实现接口是 <strong>org.apache.kafka.clients.producer.ProducerInterceptor</strong>，其定义的方法包括：</p> <ol><li><p>configure(configs) ：获取配置信息和初始化数据时调用。</p></li> <li><p>onSend(ProducerRecord)： 该方法封装进 KafkaProducer.send 方法中，即它运行在用户主线程中。 Producer 确保在 消息被序列化以及计算分区前调用该方法。 用户可以在该方法中对消息做任何操作，但最好 保证不要修改消息所属的 topic 和分区， 否则会影响目标分区的计算</p></li> <li><p>onAcknowledgement(RecordMetadata, Exception)： 该方法会在消息被应答或消息发送失败时调用，并且通常都是在 producer 回调逻辑触 发之前。 onAcknowledgement 运行在 producer 的 IO 线程中，因此不要在该方法中放入很重 的逻辑，否则会拖慢 producer 的消息发送效率</p></li> <li><p>close： 关闭 interceptor，主要用于执行一些资源清理工作 如前所述， interceptor 可能被运行在多个线程中，因此在具体实现时用户需要自行确保 线程安全。另外倘若指定了多个 interceptor，则 producer 将按照指定顺序调用它们，并仅仅 是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中 要特别留意。</p></li></ol> <h3 id="_6-2-拦截器案例"><a href="#_6-2-拦截器案例" class="header-anchor">#</a> 6.2 拦截器案例</h3> <ul><li>需求： 实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间 戳信息加到消息 value 的最前部；第二个 interceptor 会在消息发送后更新成功发送消息数或 失败发送消息数。</li></ul> <p><img src="H:/Technical-Learning/docs/_images/message-queue/Kafka/interceptor-demo.png" alt="img"></p> <ul><li><p>案例实操</p> <ul><li><p>增加时间戳拦截器</p> <div class="language-java extra-class"><pre class="language-java"><code><span class="token keyword">package</span> <span class="token namespace">priv<span class="token punctuation">.</span>learn<span class="token punctuation">.</span>interceptor</span><span class="token punctuation">;</span>

<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">ProducerInterceptor</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">ProducerRecord</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">RecordMetadata</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Map</span><span class="token punctuation">;</span>

<span class="token comment">/**
 * @description: 增加时间拦截器，在发送消息时增加时间
 */</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">TimeInterceptor</span> <span class="token keyword">implements</span> <span class="token class-name">ProducerInterceptor</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> <span class="token punctuation">{</span>
    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token class-name">ProducerRecord</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> <span class="token function">onSend</span><span class="token punctuation">(</span><span class="token class-name">ProducerRecord</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> producerRecord<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token comment">// 创建一个新的 record，把时间戳写入消息体的最前部</span>
        <span class="token keyword">return</span> <span class="token keyword">new</span> <span class="token class-name">ProducerRecord</span><span class="token punctuation">(</span>producerRecord<span class="token punctuation">.</span><span class="token function">topic</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> producerRecord<span class="token punctuation">.</span><span class="token function">partition</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
                producerRecord<span class="token punctuation">.</span><span class="token function">timestamp</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> producerRecord<span class="token punctuation">.</span><span class="token function">key</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
                <span class="token class-name">System</span><span class="token punctuation">.</span><span class="token function">currentTimeMillis</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">+</span> <span class="token string">&quot;,&quot;</span> <span class="token operator">+</span> producerRecord<span class="token punctuation">.</span><span class="token function">value</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">toString</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>

    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">onAcknowledgement</span><span class="token punctuation">(</span><span class="token class-name">RecordMetadata</span> recordMetadata<span class="token punctuation">,</span> <span class="token class-name">Exception</span> e<span class="token punctuation">)</span> <span class="token punctuation">{</span>
    <span class="token punctuation">}</span>

    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
    <span class="token punctuation">}</span>
    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">configure</span><span class="token punctuation">(</span><span class="token class-name">Map</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token operator">?</span><span class="token punctuation">&gt;</span></span> map<span class="token punctuation">)</span> <span class="token punctuation">{</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre></div></li> <li><p>统计发送消息成功和发送失败消息数，并在 producer 关闭时打印这两个计数器</p></li></ul> <div class="language-java extra-class"><pre class="language-java"><code><span class="token keyword">package</span> <span class="token namespace">priv<span class="token punctuation">.</span>learn<span class="token punctuation">.</span>interceptor</span><span class="token punctuation">;</span>

<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">ProducerInterceptor</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">ProducerRecord</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">RecordMetadata</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Map</span><span class="token punctuation">;</span>

<span class="token comment">/**
 * @description:统计发送消息成功和发送失败消息数，并在 producer 关闭时打印这两个计数器
 */</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">CounterInterceptor</span>  <span class="token keyword">implements</span> <span class="token class-name">ProducerInterceptor</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> <span class="token punctuation">{</span>
    <span class="token keyword">private</span> <span class="token keyword">int</span> errorCounter <span class="token operator">=</span> <span class="token number">0</span><span class="token punctuation">;</span>
    <span class="token keyword">private</span> <span class="token keyword">int</span> successCounter <span class="token operator">=</span> <span class="token number">0</span><span class="token punctuation">;</span>

    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token class-name">ProducerRecord</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> <span class="token function">onSend</span><span class="token punctuation">(</span><span class="token class-name">ProducerRecord</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> producerRecord<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token keyword">return</span> producerRecord<span class="token punctuation">;</span>
    <span class="token punctuation">}</span>

    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">onAcknowledgement</span><span class="token punctuation">(</span><span class="token class-name">RecordMetadata</span> recordMetadata<span class="token punctuation">,</span> <span class="token class-name">Exception</span> e<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token comment">// 统计成功和失败的次数</span>
        <span class="token keyword">if</span> <span class="token punctuation">(</span>e <span class="token operator">==</span> <span class="token keyword">null</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
            successCounter<span class="token operator">++</span><span class="token punctuation">;</span>
        <span class="token punctuation">}</span> <span class="token keyword">else</span> <span class="token punctuation">{</span>
            errorCounter<span class="token operator">++</span><span class="token punctuation">;</span>
        <span class="token punctuation">}</span>
    <span class="token punctuation">}</span>

    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token comment">// 保存结果</span>
        <span class="token class-name">System</span><span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">&quot;Successful sent: &quot;</span> <span class="token operator">+</span> successCounter<span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token class-name">System</span><span class="token punctuation">.</span>out<span class="token punctuation">.</span><span class="token function">println</span><span class="token punctuation">(</span><span class="token string">&quot;Failed sent: &quot;</span> <span class="token operator">+</span> errorCounter<span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>

    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">configure</span><span class="token punctuation">(</span><span class="token class-name">Map</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token operator">?</span><span class="token punctuation">&gt;</span></span> map<span class="token punctuation">)</span> <span class="token punctuation">{</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre></div><ul><li>创建生产者</li></ul> <div class="language-java extra-class"><pre class="language-java"><code><span class="token keyword">package</span> <span class="token namespace">priv<span class="token punctuation">.</span>learn<span class="token punctuation">.</span>interceptor</span><span class="token punctuation">;</span>

<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">KafkaProducer</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">Producer</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">ProducerConfig</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>clients<span class="token punctuation">.</span>producer<span class="token punctuation">.</span></span><span class="token class-name">ProducerRecord</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">ArrayList</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">List</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Properties</span><span class="token punctuation">;</span>

<span class="token comment">/**
 * @description: 带拦截器的producer 主程序
 */</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">InterceptorProducer</span> <span class="token punctuation">{</span>

    <span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token keyword">throws</span> <span class="token class-name">Exception</span> <span class="token punctuation">{</span>
        <span class="token comment">// 1 设置配置信息</span>
        <span class="token class-name">Properties</span> props <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">Properties</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;bootstrap.servers&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;10.121.214.96:9092&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;acks&quot;</span><span class="token punctuation">,</span> <span class="token string">&quot;all&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;retries&quot;</span><span class="token punctuation">,</span> <span class="token number">0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;batch.size&quot;</span><span class="token punctuation">,</span> <span class="token number">16384</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;linger.ms&quot;</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;buffer.memory&quot;</span><span class="token punctuation">,</span> <span class="token number">33554432</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;key.serializer&quot;</span><span class="token punctuation">,</span>
                <span class="token string">&quot;org.apache.kafka.common.serialization.StringSerializer&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token string">&quot;value.serializer&quot;</span><span class="token punctuation">,</span>
                <span class="token string">&quot;org.apache.kafka.common.serialization.StringSerializer&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 2 构建拦截链</span>
        <span class="token class-name">List</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> interceptors <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">ArrayList</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        interceptors<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span><span class="token string">&quot;priv.learn.interceptor.TimeInterceptor&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        interceptors<span class="token punctuation">.</span><span class="token function">add</span><span class="token punctuation">(</span><span class="token string">&quot;priv.learn.interceptor.CounterInterceptor&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        props<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token class-name">ProducerConfig</span><span class="token punctuation">.</span>INTERCEPTOR_CLASSES_CONFIG<span class="token punctuation">,</span> interceptors<span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token class-name">String</span> topic <span class="token operator">=</span> <span class="token string">&quot;first&quot;</span><span class="token punctuation">;</span>
        <span class="token class-name">Producer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> producer <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">KafkaProducer</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span>props<span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 3 发送消息</span>
        <span class="token keyword">for</span> <span class="token punctuation">(</span><span class="token keyword">int</span> i <span class="token operator">=</span> <span class="token number">0</span><span class="token punctuation">;</span> i <span class="token operator">&lt;</span> <span class="token number">10</span><span class="token punctuation">;</span> i<span class="token operator">++</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
            <span class="token class-name">ProducerRecord</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">String</span><span class="token punctuation">&gt;</span></span> <span class="token keyword">record</span> <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">ProducerRecord</span><span class="token generics"><span class="token punctuation">&lt;</span><span class="token punctuation">&gt;</span></span><span class="token punctuation">(</span>topic<span class="token punctuation">,</span>
                    <span class="token string">&quot;message&quot;</span> <span class="token operator">+</span> i<span class="token punctuation">)</span><span class="token punctuation">;</span>
            producer<span class="token punctuation">.</span><span class="token function">send</span><span class="token punctuation">(</span><span class="token keyword">record</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token punctuation">}</span>
        <span class="token comment">// 4 一定要关闭 producer，这样才会调用 interceptor 的 close 方法</span>
        producer<span class="token punctuation">.</span><span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre></div></li></ul> <hr> <h2 id="_7-kafka-streams"><a href="#_7-kafka-streams" class="header-anchor">#</a> 7. kafka Streams</h2> <p>Kafka Streams是Apache Kafka 开源项目的一个组成部分。是一个功能强大，易于使用的库。用于在 Kafka 上构建高可分布式、拓展性，容错的应用程序。</p> <h4 id="_7-1-kafka-streams-特点"><a href="#_7-1-kafka-streams-特点" class="header-anchor">#</a> 7.1 Kafka Streams 特点</h4> <ol><li><p>功能强大：高扩展性，弹性，容错</p></li> <li><p>轻量级 ：无需专门的集群 一个库，而不是框架</p></li> <li><p>完全集成：100%的 Kafka 0.10.0 版本兼容 ，易于集成到现有的应用程序</p></li> <li><p>实时性： 毫秒级延迟 、并非微批处理、 窗口允许乱序数据、允许迟到数据</p></li></ol> <h4 id="_7-2-为什么要有-kafka-stream"><a href="#_7-2-为什么要有-kafka-stream" class="header-anchor">#</a> 7.2 为什么要有 Kafka Stream</h4> <p>​	当前已经有非常多的流式处理系统，最知名且应用最多的开源流式处理系统有 Spark Streaming 和 Apache Storm。 Apache Storm 发展多年，应用广泛，提供记录级别的处理能力，当前也支持 SQL on Stream。而 Spark Streaming 基于 Apache Spark，可以非常方便与图计算， SQL 处理等集成，功能强大，对于熟悉其它 Spark 应用开发的用户而言使用门槛低。另外， 目前主流的 Hadoop 发行版，如 Cloudera 和 Hortonworks，都集成了 Apache Storm 和 Apache Spark，使得部署更容易。 既然 Apache Spark 与 Apache Storm 拥用如此多的优势，那为何还需要 Kafka Stream 呢？</p> <p>主要有如下原因。</p> <p>​	第一， Spark 和 Storm 都是流式处理框架，而 <strong>Kafka Stream 提供的是一个基于 Kafka 的 流式处理类库</strong>。框架要求开发者按照特定的方式去开发逻辑部分，供框架调用。开发者很难了解框架的具体运行方式，从而使得调试成本高，并且使用受限。而 Kafka Stream 作为流式处理类库，直接提供具体的类给开发者调用，整个应用的运行方式主要由开发者控制，方便使用和调试。</p> <p>![img](../../_images/message-queue/Kafka/kakfa-streams-flow.png'/&gt;</p> <p>​	第二，虽然 Cloudera 与 Hortonworks 方便了 Storm 和 Spark 的部署，但是这些框架的部 署仍然相对复杂。而 <strong>Kafka Stream 作为类库，可以非常方便的嵌入应用程序中，它对应用的打包和部署基本没有任何要求</strong>。</p> <p>​	第三，就流式处理系统而言，基本都支持 Kafka 作为数据源。例如 Storm 具有专门的 kafka-spout，而 Spark 也提供专门的 spark-streaming-kafka 模块。事实上， Kafka 基本上是主流的流式处理系统的标准数据源。换言之， <strong>大部分流式系统中都已部署了 Kafka，此时使用 Kafka Stream 的成本非常低</strong>。</p> <p>​	第四， <strong>使用 Storm 或 Spark Streaming 时，需要为框架本身的进程预留资源</strong>，如 Storm 的 supervisor 和 Spark on YARN 的 node manager。即使对于应用实例而言，框架本身也会占 用部分资源，如 Spark Streaming 需要为 shuffle 和 storage 预留内存。 但是 Kafka 作为类库不占用系统资源。</p> <p>​	第五，由于 <strong>Kafka 本身提供数据持久化</strong>，因此 Kafka Stream 提供滚动部署和滚动升级以 及重新计算的能力。</p> <p>​	第六，由于 Kafka Consumer Rebalance 机制， <strong>Kafka Stream 可以在线动态调整并行度</strong>。</p> <h4 id="_7-3-kafka-stream-数据清洗案例"><a href="#_7-3-kafka-stream-数据清洗案例" class="header-anchor">#</a> 7.3 Kafka Stream 数据清洗案例</h4> <ol><li><p>需求： 实时处理单词带有”&gt;&gt;&gt;”前缀的内容。例如输入”atguigu&gt;&gt;&gt;ximenqing”，最终处理成 “ximenqing”</p> <p>![img](../../_images/message-queue/Kafka/kafka-streams-data-clean.png'/&gt;</p></li> <li><p>demo</p> <div class="language-xml extra-class"><pre class="language-xml"><code><span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>dependency</span><span class="token punctuation">&gt;</span></span>
    <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>groupId</span><span class="token punctuation">&gt;</span></span>org.apache.kafka<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>groupId</span><span class="token punctuation">&gt;</span></span>
    <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>artifactId</span><span class="token punctuation">&gt;</span></span>kafka-streams<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>artifactId</span><span class="token punctuation">&gt;</span></span>
    <span class="token tag"><span class="token tag"><span class="token punctuation">&lt;</span>version</span><span class="token punctuation">&gt;</span></span>2.1.1<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>version</span><span class="token punctuation">&gt;</span></span>
<span class="token tag"><span class="token tag"><span class="token punctuation">&lt;/</span>dependency</span><span class="token punctuation">&gt;</span></span>
</code></pre></div></li></ol> <div class="language-java extra-class"><pre class="language-java"><code><span class="token keyword">package</span> <span class="token namespace">priv<span class="token punctuation">.</span>learn<span class="token punctuation">.</span>stream</span><span class="token punctuation">;</span>

<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>processor<span class="token punctuation">.</span></span><span class="token class-name">Processor</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>processor<span class="token punctuation">.</span></span><span class="token class-name">ProcessorContext</span><span class="token punctuation">;</span>

<span class="token comment">/**
 * @description: 具体业务处理
 */</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">LogProcessor</span> <span class="token keyword">implements</span> <span class="token class-name">Processor</span><span class="token operator">&lt;</span><span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>

    <span class="token keyword">private</span> <span class="token class-name">ProcessorContext</span> context<span class="token punctuation">;</span>

    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">init</span><span class="token punctuation">(</span><span class="token class-name">ProcessorContext</span> context<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token keyword">this</span><span class="token punctuation">.</span>context <span class="token operator">=</span> context<span class="token punctuation">;</span>
    <span class="token punctuation">}</span>

    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">process</span><span class="token punctuation">(</span><span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span> key<span class="token punctuation">,</span> <span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span> value<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token class-name">String</span> input <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">String</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token comment">// 如果包含“&gt;&gt;&gt;”则只保留该标记后面的内容</span>
        <span class="token keyword">if</span> <span class="token punctuation">(</span>input<span class="token punctuation">.</span><span class="token function">contains</span><span class="token punctuation">(</span><span class="token string">&quot;&gt;&gt;&gt;&quot;</span><span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
            input <span class="token operator">=</span> input<span class="token punctuation">.</span><span class="token function">split</span><span class="token punctuation">(</span><span class="token string">&quot;&gt;&gt;&gt;&quot;</span><span class="token punctuation">)</span><span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">]</span><span class="token punctuation">.</span><span class="token function">trim</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token comment">// 输出到下一个 topic</span>
            context<span class="token punctuation">.</span><span class="token function">forward</span><span class="token punctuation">(</span><span class="token string">&quot;logProcessor&quot;</span><span class="token punctuation">.</span><span class="token function">getBytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> input<span class="token punctuation">.</span><span class="token function">getBytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token punctuation">}</span> <span class="token keyword">else</span> <span class="token punctuation">{</span>
            context<span class="token punctuation">.</span><span class="token function">forward</span><span class="token punctuation">(</span><span class="token string">&quot;logProcessor&quot;</span><span class="token punctuation">.</span><span class="token function">getBytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> input<span class="token punctuation">.</span><span class="token function">getBytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token punctuation">}</span>
    <span class="token punctuation">}</span>

    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">punctuate</span><span class="token punctuation">(</span><span class="token keyword">long</span> timestamp<span class="token punctuation">)</span> <span class="token punctuation">{</span>
    <span class="token punctuation">}</span>

    <span class="token annotation punctuation">@Override</span>
    <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">close</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre></div><div class="language-java extra-class"><pre class="language-java"><code><span class="token keyword">package</span> <span class="token namespace">priv<span class="token punctuation">.</span>learn<span class="token punctuation">.</span>stream</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">java<span class="token punctuation">.</span>util<span class="token punctuation">.</span></span><span class="token class-name">Properties</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span></span><span class="token class-name">KafkaStreams</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span></span><span class="token class-name">StreamsConfig</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>processor<span class="token punctuation">.</span></span><span class="token class-name">Processor</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>processor<span class="token punctuation">.</span></span><span class="token class-name">ProcessorSupplier</span><span class="token punctuation">;</span>
<span class="token keyword">import</span> <span class="token namespace">org<span class="token punctuation">.</span>apache<span class="token punctuation">.</span>kafka<span class="token punctuation">.</span>streams<span class="token punctuation">.</span>processor<span class="token punctuation">.</span></span><span class="token class-name">TopologyBuilder</span><span class="token punctuation">;</span>

<span class="token comment">/**
 * @description:主类
 */</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">Application</span> <span class="token punctuation">{</span>

    <span class="token keyword">public</span> <span class="token keyword">static</span> <span class="token keyword">void</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">[</span><span class="token punctuation">]</span> args<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        <span class="token comment">// 定义输入的 topic</span>
        <span class="token class-name">String</span> from <span class="token operator">=</span> <span class="token string">&quot;first&quot;</span><span class="token punctuation">;</span>
        <span class="token comment">// 定义输出的 topic</span>
        <span class="token class-name">String</span> <span class="token keyword">to</span> <span class="token operator">=</span> <span class="token string">&quot;second&quot;</span><span class="token punctuation">;</span>
        <span class="token comment">// 设置参数</span>
        <span class="token class-name">Properties</span> settings <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">Properties</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        settings<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token class-name">StreamsConfig</span><span class="token punctuation">.</span>APPLICATION_ID_CONFIG<span class="token punctuation">,</span> <span class="token string">&quot;logFilter&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        settings<span class="token punctuation">.</span><span class="token function">put</span><span class="token punctuation">(</span><span class="token class-name">StreamsConfig</span><span class="token punctuation">.</span>BOOTSTRAP_SERVERS_CONFIG<span class="token punctuation">,</span> <span class="token string">&quot;hadoop102:9092&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token class-name">StreamsConfig</span> config <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">StreamsConfig</span><span class="token punctuation">(</span>settings<span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 构建拓扑</span>
        <span class="token class-name">TopologyBuilder</span> builder <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">TopologyBuilder</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        builder<span class="token punctuation">.</span><span class="token function">addSource</span><span class="token punctuation">(</span><span class="token string">&quot;SOURCE&quot;</span><span class="token punctuation">,</span> from<span class="token punctuation">)</span>
                <span class="token punctuation">.</span><span class="token function">addProcessor</span><span class="token punctuation">(</span><span class="token string">&quot;PROCESS&quot;</span><span class="token punctuation">,</span> <span class="token keyword">new</span> <span class="token class-name">ProcessorSupplier</span><span class="token operator">&lt;</span><span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token operator">&gt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
                    <span class="token annotation punctuation">@Override</span>
                    <span class="token keyword">public</span> <span class="token class-name">Processor</span><span class="token operator">&lt;</span><span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token punctuation">,</span> <span class="token keyword">byte</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token operator">&gt;</span> <span class="token function">get</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
<span class="token comment">// 具体分析处理</span>
                        <span class="token keyword">return</span> <span class="token keyword">new</span> <span class="token class-name">LogProcessor</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
                    <span class="token punctuation">}</span>
                <span class="token punctuation">}</span><span class="token punctuation">,</span> <span class="token string">&quot;SOURCE&quot;</span><span class="token punctuation">)</span>
                <span class="token punctuation">.</span><span class="token function">addSink</span><span class="token punctuation">(</span><span class="token string">&quot;SINK&quot;</span><span class="token punctuation">,</span> <span class="token keyword">to</span><span class="token punctuation">,</span> <span class="token string">&quot;PROCESS&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token comment">// 创建 kafka stream</span>
        <span class="token class-name">KafkaStreams</span> streams <span class="token operator">=</span> <span class="token keyword">new</span> <span class="token class-name">KafkaStreams</span><span class="token punctuation">(</span>builder<span class="token punctuation">,</span> config<span class="token punctuation">)</span><span class="token punctuation">;</span>
        streams<span class="token punctuation">.</span><span class="token function">start</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre></div><hr> <h2 id="_8-扩展"><a href="#_8-扩展" class="header-anchor">#</a> 8. 扩展</h2> <h3 id="_8-1-kafka-与-flume-比较"><a href="#_8-1-kafka-与-flume-比较" class="header-anchor">#</a> 8.1 Kafka 与 Flume 比较</h3> <p>在企业中必须要清楚流式数据采集框架 flume 和 kafka 的定位是什么：</p> <h5 id="flume"><a href="#flume" class="header-anchor">#</a> flume：</h5> <ul><li>cloudera 公司研发: 适合多个生产者；</li> <li>适合下游数据消费者不多的情况；</li> <li>适合数据安全性要求不高的操作；</li> <li>适合与 Hadoop 生态圈对接的操作。</li></ul> <h5 id="kafka"><a href="#kafka" class="header-anchor">#</a> kafka：</h5> <ul><li><p>linkedin 公司研发: 适合数据下游消费众多的情况；</p></li> <li><p>适合数据安全性要求较高的操作，支持 replication。</p></li></ul> <p>因此我们常用的一种模型是： 线上数据 --&gt; flume --&gt; kafka --&gt; flume(根据情景增删该流程) --&gt; HDFS</p> <h3 id="_8-2-kafka-配置信息"><a href="#_8-2-kafka-配置信息" class="header-anchor">#</a> 8.2 Kafka 配置信息</h3> <h5 id="broker-配置信息"><a href="#broker-配置信息" class="header-anchor">#</a> Broker 配置信息</h5> <table><thead><tr><th>Property</th> <th>Default</th> <th>Description</th></tr></thead> <tbody><tr><td><strong>broker.id</strong></td> <td></td> <td>必填参数，broker 的唯一标识 ，每个broker都可以用一个唯一的非负整数id进行标识；这个id可以作为broker的“名字”</td></tr> <tr><td><strong>log.dirs</strong></td> <td>/tmp/kafka-logs</td> <td>Kafka 数据存放的目录。可以指定多个目录， 中间用逗号分隔，当新 partition 被创建的时 会被存放到当前存放 partition 最少的目录。</td></tr> <tr><td><strong>port</strong></td> <td>6667</td> <td>server接受客户端连接的端口</td></tr> <tr><td><strong>zookeeper.connect</strong></td> <td>null</td> <td>ZooKeeper连接字符串的格式为：hostname:port，此处hostname和port分别是ZooKeeper集群中某个节点的host和port；为了当某个host宕掉之后你能通过其他ZooKeeper节点进行连接，你可以按照一下方式制定多个hosts： hostname1:port1, hostname2:port2, hostname3:port3.  ZooKeeper   允许你增加一个“chroot”路径，将集群中所有kafka数据存放在特定的路径下。当多个Kafka集群或者其他应用使用相同ZooKeeper集群时，可以使用这个方式设置数据存放路径。这种方式的实现可以通过这样设置连接字符串格式，如下所示： hostname1：port1，hostname2：port2，hostname3：port3/chroot/path 这样设置就将所有kafka集群数据存放在/chroot/path路径下。注意，在你启动broker之前，你必须创建这个路径，并且consumers必须使用相同的连接格式。</td></tr> <tr><td>message.max.bytes</td> <td>1000000</td> <td>server可以接收的消息最大尺寸。重要的是，consumer和producer有关这个属性的设置必须同步，否则producer发布的消息对consumer来说太大。</td></tr> <tr><td>num.network.threads</td> <td>3</td> <td>server用来处理网络请求的网络线程数目；一般你不需要更改这个属性。</td></tr> <tr><td>num.io.threads</td> <td>8</td> <td>server用来处理请求的I/O线程的数目；这个线程数目至少要等于硬盘的个数。</td></tr> <tr><td>background.threads</td> <td>4</td> <td>用于后台处理的线程数目，例如文件删除；你不需要更改这个属性。</td></tr> <tr><td>queued.max.requests</td> <td>500</td> <td>在网络线程停止读取新请求之前，可以排队等待I/O线程处理的最大请求个数。</td></tr> <tr><td>host.name</td> <td>null</td> <td>broker的hostname；如果hostname已经设置的话，broker将只会绑定到这个地址上；如果没有设置，它将绑定到所有接口，并发布一份到ZK</td></tr> <tr><td>advertised.host.name</td> <td>null</td> <td>如果设置，则就作为broker 的hostname发往producer、consumers以及其他brokers</td></tr> <tr><td>advertised.port</td> <td>null</td> <td>此端口将给与producers、consumers、以及其他brokers，它会在建立连接时用到； 它仅在实际端口和server需要绑定的端口不一样时才需要设置。</td></tr> <tr><td>socket.send.buffer.bytes</td> <td>100 * 1024</td> <td>SO_SNDBUFF 缓存大小，server进行socket 连接所用</td></tr> <tr><td>socket.receive.buffer.bytes</td> <td>100 * 1024</td> <td>SO_RCVBUFF缓存大小，server进行socket连接时所用</td></tr> <tr><td>socket.request.max.bytes</td> <td>100 * 1024 * 1024</td> <td>server允许的最大请求尺寸；  这将避免server溢出，它应该小于Java  heap size</td></tr> <tr><td>num.partitions</td> <td>1</td> <td>如果创建topic时没有给出划分partitions个数，这个数字将是topic下partitions数目的默认数值。</td></tr> <tr><td>log.segment.bytes</td> <td>1014<em>1024</em>1024</td> <td>topic  partition的日志存放在某个目录下诸多文件中，这些文件将partition的日志切分成一段一段的；这个属性就是每个文件的最大尺寸；当尺寸达到这个数值时，就会创建新文件。此设置可以由每个topic基础设置时进行覆盖。 查看  <a href="http://kafka.apache.org/documentation.html#topic-config" target="_blank" rel="noopener noreferrer">the per-topic  configuration section<svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a></td></tr> <tr><td>log.roll.{ms,hours}</td> <td>24 * 7 hours</td> <td>新建 segment 文件的时间，此值可以被 topic 级别的参数覆盖。</td></tr> <tr><td>log.retention.{ms,minute s,hours}</td> <td>7 days</td> <td>Kafka segment log 的保存周期，保存周期超 过此时间日志就会被删除。此参数可以被 topic 级别参数覆盖。数据量大时，建议减小 此值。</td></tr> <tr><td>log.retention.bytes</td> <td>-1</td> <td>每个 partition 的最大容量，若数据量超过此 值，partition 数据将会被删除。注意这个参数  控制的是每个 partition 而不是 topic。此参数 可以被 log 级别参数覆盖。</td></tr> <tr><td>log.retention.check.interval.ms</td> <td>5 minutes</td> <td>检查日志分段文件的间隔时间，以确定是否文件属性是否到达删除要求。</td></tr> <tr><td>log.cleaner.enable</td> <td>false</td> <td>当这个属性设置为false时，一旦日志的保存时间或者大小达到上限时，就会被删除；如果设置为true，则当保存属性达到上限时，就会进行<a href="https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction" target="_blank" rel="noopener noreferrer">log compaction<svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a>。</td></tr> <tr><td>log.cleaner.threads</td> <td>1</td> <td>进行日志压缩的线程数</td></tr> <tr><td>log.cleaner.io.buffer.size</td> <td>500<em>1024</em>1024</td> <td>log cleaner清除过程中针对日志进行索引化以及精简化所用到的缓存大小。最好设置大点，以提供充足的内存。</td></tr> <tr><td>log.cleaner.io.buffer.load.factor</td> <td>512*1024</td> <td>进行log cleaning时所需要的I/O chunk尺寸。你不需要更改这项设置。</td></tr> <tr><td>log.cleaner.io.buffer.load.factor</td> <td>0.9</td> <td>log cleaning中所使用的hash表的负载因子；你不需要更改这个选项。</td></tr> <tr><td>log.cleaner.backoff.ms</td> <td>15000</td> <td>进行日志是否清理检查的时间间隔</td></tr> <tr><td>log.cleaner.min.cleanable.ratio</td> <td>0.5</td> <td>这项配置控制log  compactor试图清理日志的频率（假定<a href="https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction" target="_blank" rel="noopener noreferrer">log compaction<svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a>是打开的）。默认避免清理压缩超过50%的日志。这个比率绑定了备份日志所消耗的最大空间（50%的日志备份时压缩率为50%）。更高的比率则意味着浪费消耗更少，也就可以更有效的清理更多的空间。这项设置在每个topic设置中可以覆盖。 查看<a href="http://kafka.apache.org/documentation.html#topic-config" target="_blank" rel="noopener noreferrer">the per-topic  configuration section<svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a>。</td></tr> <tr><td>log.flush.scheduler.interval.ms</td> <td>Long.MaxValue</td> <td>检查是否需要fsync的时间间隔</td></tr> <tr><td>log.flush.interval.ms</td> <td>Long.MaxValue</td> <td>仅仅通过interval来控制消息的磁盘写入时机，是不足的，这个数用来控制”fsync“的时间间隔，如果消息量始终没有达到固化到磁盘的消息数，但是离上次磁盘同步的时间间隔达到阈值，也将触发磁盘同步。</td></tr> <tr><td>log.delete.delay.ms</td> <td>60000</td> <td>文件在索引中清除后的保留时间，一般不需要修改</td></tr> <tr><td>auto.create.topics.enable</td> <td>true</td> <td>是否允许自动创建topic。如果是真的，则produce或者fetch 不存在的topic时，会自动创建这个topic。否则需要使用命令行创建topic</td></tr> <tr><td>controller.socket.timeout.ms</td> <td>30000</td> <td>partition管理控制器进行备份时，socket的超时时间。</td></tr> <tr><td>controller.message.queue.size</td> <td>Int.MaxValue</td> <td>controller-to-broker-channles的buffer 尺寸</td></tr> <tr><td>default.replication.factor</td> <td>1</td> <td>默认备份份数，仅指自动创建的topics</td></tr> <tr><td>replica.lag.time.max.ms</td> <td>10000</td> <td>如果一个follower在这个时间内没有发送fetch请求，leader将从ISR重移除这个follower，并认为这个follower已经挂了</td></tr> <tr><td>replica.lag.max.messages</td> <td>4000</td> <td>如果一个replica没有备份的条数超过这个数值，则leader将移除这个follower，并认为这个follower已经挂了</td></tr> <tr><td>replica.socket.timeout.ms</td> <td>30*1000</td> <td>leader 备份数据时的socket网络请求的超时时间</td></tr> <tr><td>replica.socket.receive.buffer.bytes</td> <td>64*1024</td> <td>备份时向leader发送网络请求时的socket receive buffer</td></tr> <tr><td>replica.fetch.max.bytes</td> <td>1024*1024</td> <td>备份时每次fetch的最大值</td></tr> <tr><td>replica.fetch.min.bytes</td> <td>500</td> <td>leader发出备份请求时，数据到达leader的最长等待时间</td></tr> <tr><td>replica.fetch.min.bytes</td> <td>1</td> <td>备份时每次fetch之后回应的最小尺寸</td></tr> <tr><td>num.replica.fetchers</td> <td>1</td> <td>从leader备份数据的线程数</td></tr> <tr><td>replica.high.watermark.checkpoint.interval.ms</td> <td>5000</td> <td>每个replica检查是否将最高水位进行固化的频率</td></tr> <tr><td>fetch.purgatory.purge.interval.requests</td> <td>1000</td> <td>fetch 请求清除时的清除间隔</td></tr> <tr><td>producer.purgatory.purge.interval.requests</td> <td>1000</td> <td>producer请求清除时的清除间隔</td></tr> <tr><td>zookeeper.session.timeout.ms</td> <td>6000</td> <td>zookeeper会话超时时间。</td></tr> <tr><td>zookeeper.connection.timeout.ms</td> <td>6000</td> <td>客户端等待和zookeeper建立连接的最大时间</td></tr> <tr><td>zookeeper.sync.time.ms</td> <td>2000</td> <td>zk follower落后于zk leader的最长时间</td></tr> <tr><td>controlled.shutdown.enable</td> <td>true</td> <td>是否能够控制broker的关闭。如果能够，broker将可以移动所有leaders到其他的broker上，在关闭之前。这减少了不可用性在关机过程中。</td></tr> <tr><td>controlled.shutdown.max.retries</td> <td>3</td> <td>在执行不彻底的关机之前，可以成功执行关机的命令数。</td></tr> <tr><td>controlled.shutdown.retry.backoff.ms</td> <td>5000</td> <td>在关机之间的backoff时间</td></tr> <tr><td>auto.leader.rebalance.enable</td> <td>true</td> <td>如果这是true，控制者将会自动平衡brokers对于partitions的leadership</td></tr> <tr><td>leader.imbalance.per.broker.percentage</td> <td>10</td> <td>每个broker所允许的leader最大不平衡比率</td></tr> <tr><td>leader.imbalance.check.interval.seconds</td> <td>300</td> <td>检查leader不平衡的频率</td></tr> <tr><td>offset.metadata.max.bytes</td> <td>4096</td> <td>允许客户端保存他们offsets的最大个数</td></tr> <tr><td>max.connections.per.ip</td> <td>Int.MaxValue</td> <td>每个ip地址上每个broker可以被连接的最大数目</td></tr> <tr><td>max.connections.per.ip.overrides</td> <td></td> <td>每个ip或者hostname默认的连接的最大覆盖</td></tr> <tr><td>connections.max.idle.ms</td> <td>600000</td> <td>空连接的超时限制</td></tr> <tr><td>log.roll.jitter.{ms,hours}</td> <td>0</td> <td>从logRollTimeMillis抽离的jitter最大数目</td></tr> <tr><td>num.recovery.threads.per.data.dir</td> <td>1</td> <td>每个数据目录用来日志恢复的线程数目</td></tr> <tr><td>unclean.leader.election.enable</td> <td>true</td> <td>指明了是否能够使不在ISR中replicas设置用来作为leader</td></tr> <tr><td><strong>delete.topic.enable</strong></td> <td>false</td> <td>启用 deletetopic 参数，建议设置为 true</td></tr> <tr><td>offsets.topic.num.partitions</td> <td>50</td> <td>The number of partitions for the offset commit topic. Since changing  this after deployment is currently unsupported, we recommend using a  higher setting for production (e.g., 100-200).</td></tr> <tr><td>offsets.topic.retention.minutes</td> <td>1440</td> <td>存在时间超过这个时间限制的offsets都将被标记为待删除</td></tr> <tr><td>offsets.retention.check.interval.ms</td> <td>600000</td> <td>offset管理器检查陈旧offsets的频率</td></tr> <tr><td>offsets.topic.replication.factor</td> <td>3</td> <td>topic的offset的备份份数。建议设置更高的数字保证更高的可用性</td></tr> <tr><td>offset.topic.segment.bytes</td> <td>104857600</td> <td>offsets topic的segment尺寸。</td></tr> <tr><td>offsets.load.buffer.size</td> <td>5242880</td> <td>这项设置与批量尺寸相关，当从offsets segment中读取时使用。</td></tr> <tr><td>offsets.commit.required.acks</td> <td>-1</td> <td>在offset  commit可以接受之前，需要设置确认的数目，一般不需要更改</td></tr></tbody></table> <h5 id="producer-配置信息"><a href="#producer-配置信息" class="header-anchor">#</a> Producer 配置信息</h5> <table><thead><tr><th>Property</th> <th>Default</th> <th>Description</th></tr></thead> <tbody><tr><td><strong>metadata.broker.list</strong></td> <td></td> <td>启动时 producer 查询 brokers 的列表，可以是集群中所有 brokers 的一个子集。注意，这 个参数只是用来获取 topic 的元信息用， producer 会从元信息中挑选合适的 broker 并 与 之 建 立 socket 连 接 。 格 式 是 ： host1:port1,host2:port2。</td></tr> <tr><td>request.required.acks</td> <td>0</td> <td>此配置是表明当一次produce请求被认为完成时的确认值。特别是，多少个其他brokers必须已经提交了数据到他们的log并且向他们的leader确认了这些信息。典型的值包括： 0： 表示producer从来不等待来自broker的确认信息（和0.7一样的行为）。这个选择提供了最小的时延但同时风险最大（因为当server宕机时，数据将会丢失）。 1：表示获得leader replica已经接收了数据的确认信息。这个选择时延较小同时确保了server确认接收成功。 -1：producer会获得所有同步replicas都收到数据的确认。同时时延最大，然而，这种方式并没有完全消除丢失消息的风险，因为同步replicas的数量可能是1.如果你想确保某些replicas接收到数据，那么你应该在topic-level设置中选项min.insync.replicas设置一下。请阅读一下设计文档，可以获得更深入的讨论。</td></tr> <tr><td>request.timeout.ms</td> <td>10000</td> <td>broker尽力实现request.required.acks需求时的等待时间，否则会发送错误到客户端</td></tr> <tr><td>producer.type</td> <td>sync</td> <td>此选项置顶了消息是否在后台线程中异步发送。正确的值： （1）  async： 异步发送 （2）  sync： 同步发送 通过将producer设置为异步，我们可以批量处理请求（有利于提高吞吐率）但是这也就造成了客户端机器丢掉未发送数据的可能性</td></tr> <tr><td>serializer.class</td> <td>kafka.serializer.DefaultEncoder</td> <td>消息的序列化类别。默认编码器输入一个字节byte[]，然后返回相同的字节byte[]</td></tr> <tr><td>key.serializer.class</td> <td></td> <td>关键字的序列化类。如果没给与这项，默认情况是和消息一致</td></tr> <tr><td>partitioner.class</td> <td>kafka.producer.DefaultPartitioner</td> <td>partitioner 类，用于在subtopics之间划分消息。默认partitioner基于key的hash表</td></tr> <tr><td>compression.codec</td> <td>none</td> <td>此项参数可以设置压缩数据的codec，可选codec为：“none”， “gzip”， “snappy”</td></tr> <tr><td>compressed.topics</td> <td>null</td> <td>此项参数可以设置某些特定的topics是否进行压缩。如果压缩codec是NoCompressCodec之外的codec，则对指定的topics数据应用这些codec。如果压缩topics列表是空，则将特定的压缩codec应用于所有topics。如果压缩的codec是NoCompressionCodec，压缩对所有topics军不可用。</td></tr> <tr><td>message.send.max.retries</td> <td>3</td> <td>此项参数将使producer自动重试失败的发送请求。此项参数将置顶重试的次数。注意：设定非0值将导致重复某些网络错误：引起一条发送并引起确认丢失</td></tr> <tr><td>retry.backoff.ms</td> <td>100</td> <td>在每次重试之前，producer会更新相关topic的metadata，以此进行查看新的leader是否分配好了。因为leader的选择需要一点时间，此选项指定更新metadata之前producer需要等待的时间。</td></tr> <tr><td>topic.metadata.refresh.interval.ms</td> <td>600*1000</td> <td>producer一般会在某些失败的情况下（partition  missing，leader不可用等）更新topic的metadata。他将会规律的循环。如果你设置为负值，metadata只有在失败的情况下才更新。如果设置为0，metadata会在每次消息发送后就会更新（不建议这种选择，系统消耗太大）。重要提示： 更新是有在消息发送后才会发生，因此，如果producer从来不发送消息，则metadata从来也不会更新。</td></tr> <tr><td>queue.buffering.max.ms</td> <td>5000</td> <td>当应用async模式时，用户缓存数据的最大时间间隔。例如，设置为100时，将会批量处理100ms之内消息。这将改善吞吐率，但是会增加由于缓存产生的延迟。</td></tr> <tr><td>queue.buffering.max.messages</td> <td>10000</td> <td>当使用async模式时，在在producer必须被阻塞或者数据必须丢失之前，可以缓存到队列中的未发送的最大消息条数</td></tr> <tr><td>batch.num.messages</td> <td>200</td> <td>使用async模式时，可以批量处理消息的最大条数。或者消息数目已到达这个上线或者是queue.buffer.max.ms到达，producer才会处理</td></tr> <tr><td>send.buffer.bytes</td> <td>100*1024</td> <td>socket 写缓存尺寸</td></tr> <tr><td>client.id</td> <td>“”</td> <td>这个client  id是用户特定的字符串，在每次请求中包含用来追踪调用，他应该逻辑上可以确认是那个应用发出了这个请求。</td></tr></tbody></table> <h5 id=""><a href="#" class="header-anchor">#</a></h5> <h5 id="consumer-配置信息"><a href="#consumer-配置信息" class="header-anchor">#</a> Consumer 配置信息</h5> <table><thead><tr><th>Property</th> <th>Default</th> <th>Description</th></tr></thead> <tbody><tr><td>group.id</td> <td></td> <td>用来唯一标识consumer进程所在组的字符串，如果设置同样的group  id，表示这些processes都是属于同一个consumer  group</td></tr> <tr><td>zookeeper.connect</td> <td></td> <td>指定zookeeper的连接的字符串，格式是hostname：port，此处host和port都是zookeeper  server的host和port，为避免某个zookeeper 机器宕机之后失联，你可以指定多个hostname：port，使用逗号作为分隔： hostname1：port1，hostname2：port2，hostname3：port3 可以在zookeeper连接字符串中加入zookeeper的chroot路径，此路径用于存放他自己的数据，方式： hostname1：port1，hostname2：port2，hostname3：port3/chroot/path</td></tr> <tr><td>consumer.id</td> <td>null</td> <td>不需要设置，一般自动产生</td></tr> <tr><td>socket.timeout.ms</td> <td>30*100</td> <td>网络请求的超时限制。真实的超时限制是   max.fetch.wait+socket.timeout.ms</td></tr> <tr><td>socket.receive.buffer.bytes</td> <td>64*1024</td> <td>socket用于接收网络请求的缓存大小</td></tr> <tr><td>fetch.message.max.bytes</td> <td>1024*1024</td> <td>每次fetch请求中，针对每次fetch消息的最大字节数。这些字节将会督导用于每个partition的内存中，因此，此设置将会控制consumer所使用的memory大小。这个fetch请求尺寸必须至少和server允许的最大消息尺寸相等，否则，producer可能发送的消息尺寸大于consumer所能消耗的尺寸。</td></tr> <tr><td>num.consumer.fetchers</td> <td>1</td> <td>用于fetch数据的fetcher线程数</td></tr> <tr><td>auto.commit.enable</td> <td>true</td> <td>如果为真，consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程挂掉时，由新的consumer使用</td></tr> <tr><td>auto.commit.interval.ms</td> <td>60*1000</td> <td>consumer向zookeeper提交offset的频率，单位是秒</td></tr> <tr><td>queued.max.message.chunks</td> <td>2</td> <td>用于缓存消息的最大数目，以供consumption。每个chunk必须和fetch.message.max.bytes相同</td></tr> <tr><td>rebalance.max.retries</td> <td>4</td> <td>当新的consumer加入到consumer  group时，consumers集合试图重新平衡分配到每个consumer的partitions数目。如果consumers集合改变了，当分配正在执行时，这个重新平衡会失败并重入</td></tr> <tr><td>fetch.min.bytes</td> <td>1</td> <td>每次fetch请求时，server应该返回的最小字节数。如果没有足够的数据返回，请求会等待，直到足够的数据才会返回。</td></tr> <tr><td>fetch.wait.max.ms</td> <td>100</td> <td>如果没有足够的数据能够满足fetch.min.bytes，则此项配置是指在应答fetch请求之前，server会阻塞的最大时间。</td></tr> <tr><td>rebalance.backoff.ms</td> <td>2000</td> <td>在重试reblance之前backoff时间</td></tr> <tr><td>refresh.leader.backoff.ms</td> <td>200</td> <td>在试图确定某个partition的leader是否失去他的leader地位之前，需要等待的backoff时间</td></tr> <tr><td>auto.offset.reset</td> <td>largest</td> <td>zookeeper中没有初始化的offset时，如果offset是以下值的回应： smallest：自动复位offset为smallest的offset largest：自动复位offset为largest的offset anything  else：向consumer抛出异常</td></tr> <tr><td>consumer.timeout.ms</td> <td>-1</td> <td>如果没有消息可用，即使等待特定的时间之后也没有，则抛出超时异常</td></tr> <tr><td>exclude.internal.topics</td> <td>true</td> <td>是否将内部topics的消息暴露给consumer</td></tr> <tr><td>paritition.assignment.strategy</td> <td>range</td> <td>选择向consumer 流分配partitions的策略，可选值：range，roundrobin</td></tr> <tr><td>client.id</td> <td>group id value</td> <td>是用户特定的字符串，用来在每次请求中帮助跟踪调用。它应该可以逻辑上确认产生这个请求的应用</td></tr> <tr><td>zookeeper.session.timeout.ms</td> <td>6000</td> <td>zookeeper 会话的超时限制。如果consumer在这段时间内没有向zookeeper发送心跳信息，则它会被认为挂掉了，并且reblance将会产生</td></tr> <tr><td>zookeeper.connection.timeout.ms</td> <td>6000</td> <td>客户端在建立通zookeeper连接中的最大等待时间</td></tr> <tr><td>zookeeper.sync.time.ms</td> <td>2000</td> <td>ZK follower可以落后ZK leader的最大时间</td></tr> <tr><td>offsets.storage</td> <td>zookeeper</td> <td>用于存放offsets的地点： zookeeper或者kafka</td></tr> <tr><td>offset.channel.backoff.ms</td> <td>1000</td> <td>重新连接offsets channel或者是重试失败的offset的fetch/commit请求的backoff时间</td></tr> <tr><td>offsets.channel.socket.timeout.ms</td> <td>10000</td> <td>当读取offset的fetch/commit请求回应的socket 超时限制。此超时限制是被consumerMetadata请求用来请求offset管理</td></tr> <tr><td>offsets.commit.max.retries</td> <td>5</td> <td>重试offset commit的次数。这个重试只应用于offset  commits在shut-down之间。他</td></tr> <tr><td>dual.commit.enabled</td> <td>true</td> <td>如果使用“kafka”作为offsets.storage，你可以二次提交offset到zookeeper(还有一次是提交到kafka）。在zookeeper-based的offset   storage到kafka-based的offset storage迁移时，这是必须的。对任意给定的consumer   group来说，比较安全的建议是当完成迁移之后就关闭这个选项</td></tr> <tr><td>partition.assignment.strategy</td> <td>range</td> <td>在“range”和“roundrobin”策略之间选择一种作为分配partitions给consumer 数据流的策略；  循环的partition分配器分配所有可用的partitions以及所有可用consumer   线程。它会将partition循环的分配到consumer线程上。如果所有consumer实例的订阅都是确定的，则partitions的划分是确定的分布。循环分配策略只有在以下条件满足时才可以：（1）每个topic在每个consumer实力上都有同样数量的数据流。（2）订阅的topic的集合对于consumer   group中每个consumer实例来说都是确定的。</td></tr></tbody></table></div> <footer class="page-edit" style="display:none;"><!----> <!----></footer> <!----> <!----> <!----></main> <!----></div></div></div></div><div class="global-ui"><div class="back-to-ceiling" style="right:1rem;bottom:6rem;width:2.5rem;height:2.5rem;border-radius:.25rem;line-height:2.5rem;display:none;" data-v-db14854a data-v-db14854a><svg t="1574745035067" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg" p-id="5404" class="icon" data-v-db14854a><path d="M526.60727968 10.90185116a27.675 27.675 0 0 0-29.21455937 0c-131.36607665 82.28402758-218.69155461 228.01873535-218.69155402 394.07834331a462.20625001 462.20625001 0 0 0 5.36959153 69.94390903c1.00431239 6.55289093-0.34802892 13.13561351-3.76865779 18.80351572-32.63518765 54.11355614-51.75690182 118.55860487-51.7569018 187.94566865a371.06718723 371.06718723 0 0 0 11.50484808 91.98906777c6.53300375 25.50556257 41.68394495 28.14064038 52.69160883 4.22606766 17.37162448-37.73630017 42.14135425-72.50938081 72.80769204-103.21549295 2.18761121 3.04276886 4.15646224 6.24463696 6.40373557 9.22774369a1871.4375 1871.4375 0 0 0 140.04691725 5.34970492 1866.36093723 1866.36093723 0 0 0 140.04691723-5.34970492c2.24727335-2.98310674 4.21612437-6.18497483 6.3937923-9.2178004 30.66633723 30.70611158 55.4360664 65.4791928 72.80769147 103.21549355 11.00766384 23.91457269 46.15860503 21.27949489 52.69160879-4.22606768a371.15156223 371.15156223 0 0 0 11.514792-91.99901164c0-69.36717486-19.13165746-133.82216804-51.75690182-187.92578088-3.42062944-5.66790279-4.76302748-12.26056868-3.76865837-18.80351632a462.20625001 462.20625001 0 0 0 5.36959269-69.943909c-0.00994388-166.08943902-87.32547796-311.81420293-218.6915546-394.09823051zM605.93803103 357.87693858a93.93749974 93.93749974 0 1 1-187.89594924 6.1e-7 93.93749974 93.93749974 0 0 1 187.89594924-6.1e-7z" p-id="5405" data-v-db14854a></path><path d="M429.50777625 765.63860547C429.50777625 803.39355007 466.44236686 1000.39046097 512.00932183 1000.39046097c45.56695499 0 82.4922232-197.00623328 82.5015456-234.7518555 0-37.75494459-36.9345906-68.35043303-82.4922232-68.34111062-45.57627738-0.00932239-82.52019037 30.59548842-82.51086798 68.34111062z" p-id="5406" data-v-db14854a></path></svg></div><!----></div></div>
    <script src="/assets/js/app.447d4224.js" defer></script><script src="/assets/js/3.9d76740c.js" defer></script><script src="/assets/js/1.c4fd7d2e.js" defer></script><script src="/assets/js/151.971fdf4b.js" defer></script>
  </body>
</html>
